package main

import (
	"gitee.com/k8sio/cnkit/queue/delayqueue"
	"github.com/RoaringBitmap/roaring/roaring64"
	"github.com/go-redis/redis/v8"
	"log"
	"strconv"
	"sync"
	"time"
)

var (
	sbm    = roaring64.New()
	gbm    = roaring64.New()
	t1, t2 time.Time
	count  int
	lock   sync.Mutex
	queue  *delayqueue.DelayQueue
)

func init() {
	log.SetFlags(log.LstdFlags | log.Lshortfile | log.Lmicroseconds)

	r2 := redis.NewUniversalClient(&redis.UniversalOptions{
		Addrs:    []string{"127.0.0.1:6379"},
		DB:       0,
		PoolSize: 10,
	})

	redisCli := r2
	client := delayqueue.NewRedisV8Wrapper(redisCli)

	queue = delayqueue.NewQueue0(
		"test:delay0",
		client,
		delayqueue.UseHashTagKey(),
		cbProcess,
	).WithConcurrent(10).WithFetchLimit(10000)
}
func cbProcess(payload string) bool {
	lock.Lock()
	defer lock.Unlock()

	if count == 0 {
		t2 = time.Now()
		log.Printf("cbProcess begin...")
	}
	count++

	i, _ := strconv.ParseUint(payload, 10, 64)
	if gbm.Contains(i) {
		log.Fatalf("duplicate payload %v", i)
	}
	gbm.Add(i)
	return true
}

func main() {
	var ss []string

	//sbm.AddRange(0, 1000000)
	for i := uint64(0); i < 10000; i++ {
		sbm.Add(i)
		ss = append(ss, strconv.FormatUint(i, 10))
	}

	t1 = time.Now()
	log.Printf("put msg len %v begin...", len(ss))
	err := queue.SendDelayMsgBatch(ss, time.Second*10, len(ss), delayqueue.WithRetryCount(3))
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("put msg len %v done, cost %v", len(ss), time.Since(t1))

	go func() {
		defer log.Println("watch quit")
		for {
			if sbm.Equals(gbm) {
				log.Println("sbm", sbm.GetCardinality())
				log.Println("gbm", gbm.GetCardinality())
				log.Printf("cbProcess finish, cost %v", time.Since(t2))
				queue.StopConsume()
				return
			}
			time.Sleep(time.Second)
		}
	}()

	log.Printf("consume start")
	// start consume
	//queue.StopConsume()
	done := queue.StartConsume()
	<-done
	log.Println("consume done")
}
